package com.atguigu.bigdata.spark.streaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

public class SparkStreaming06_State_Window_JAVA {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming");

        //采集周期3秒
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(3));

        //输入DStream
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost",9999);
        JavaPairDStream<String,Integer> lineToMap = lines.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        // 窗口的范围应该是采集周期的整数倍
        // 窗口可以滑动的，但是默认情况下，一个采集周期进行滑动
        // 这样的话，可能会出现重复数据的计算，为了避免这种情况，可以改变滑动的滑动（步长）
        JavaPairDStream<String,Integer> windowDS = lineToMap.window(Durations.seconds(6),Durations.seconds(6));

        JavaPairDStream<String,Integer> wordToCount = windowDS.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        wordToCount.print();

        // 1. 启动采集器
        jssc.start();
        // 2. 等待采集器的关闭
        jssc.awaitTermination();
    }
}
